/*******************************************************************************
 * Package: com.song.bigdata.v2
 * Type:    Aggregation
 * Date:    2024-09-19 10:31
 *
 * Copyright (c) 2024 LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.bigdata.v2;

import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 功能描述： 聚合操作
 * 给wordCount 差不读
 * 分组 求和
 * @author Songxianyang
 * @date 2024-09-19 10:31
 */
public class Aggregation {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 录入数据 也就是数据源读取
        List<String> name = Lists.newArrayList("song", "song", "xian", "yang", "yang");
        // 创建流
        DataStreamSource<String> streamSource = env.fromCollection(name);
        // 扁平
        DataStream<Tuple2<String, Integer>> sum = streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                out.collect(new Tuple2<>(value, 1));
            }

            private static final long serialVersionUID = -4452613111738241686L;
        }).keyBy(key-> key.f0).sum(1);
        sum.print();
        env.execute();
    }

}
